<?xml version="1.0" encoding="utf-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
  "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
<head>
  <meta content="$Id: riak.html 155 2011-05-25 15:38:44Z gvwilson $" name="provenance" />
  <link href="../Styles/stylesheet.css" rel="stylesheet" type="text/css" />

  <title>The Architecture of Open Source Applications: Riak and Erlang/OTP</title>
</head>

<body>
  <div class="header">
    <h1 class="chaptitle" id="heading_id_2">Chapter 15. Riak and Erlang/OTP</h1>

    <h1 class="chapterauthor" id="heading_id_3"><a href="../Text/intro.html#cesarini-francesco">Francesco Cesarini</a>, <a href="../Text/intro.html#gross-andy">Andy Gross</a>, and <a href="../Text/intro.html#sheehy-justin">Justin Sheehy</a></h1>
  </div>

  <p>Riak is a distributed, fault tolerant, open source database that illustrates how to build large scale systems using Erlang/OTP. Thanks in large part to Erlang's support for massively scalable distributed systems, Riak offers features that are uncommon in databases, such as high-availability and linear scalability of both capacity and throughput.</p>

  <p>Erlang/OTP provides an ideal platform for developing systems like Riak because it provides inter-node communication, message queues, failure detectors, and client-server abstractions out of the box. What's more, most frequently-used patterns in Erlang have been implemented in library modules, commonly referred to as OTP behaviors. They contain the generic code framework for concurrency and error handling, simplifying concurrent programming and protecting the developer from many common pitfalls. Behaviors are monitored by supervisors, themselves a behavior, and grouped together in supervision trees. A supervision tree is packaged in an application, creating a building block of an Erlang program.</p>

  <p>A complete Erlang system such as Riak is a set of loosely coupled applications that interact with each other. Some of these applications have been written by the developer, some are part of the standard Erlang/OTP distribution, and some may be other open source components. They are sequentially loaded and started by a boot script generated from a list of applications and versions.</p>

  <p>What <em>differs</em> among systems are the applications that are part of the release which is started. In the standard Erlang distribution, the boot files will start the <em>Kernel</em> and <em>StdLib</em> (Standard Library) applications. In some installations, the <em>SASL</em> (Systems Architecture Support Library) application is also started. SASL contains release and software upgrade tools together with logging capabilities. Riak is no different, other than starting the Riak specific applications as well as their runtime dependencies, which include <em>Kernel</em>, <em>StdLib</em> and <em>SASL</em>. A complete and ready-to-run build of Riak actually embeds these standard elements of the Erlang/OTP distribution and starts them all in unison when <code>riak start</code> is invoked on the command line. Riak consists of many complex applications, so this chapter should not be interpreted as a complete guide. It should be seen as an introduction to OTP where examples from the Riak source code are used. The figures and examples have been abbreviated and shortened for demonstration purposes.</p>

  <div class="sect">
    <h2 id="heading_id_4">15.1. An Abridged Introduction to Erlang</h2>

    <p>Erlang is a concurrent functional programming language that compiles to byte code and runs in a virtual machine. Programs consist of functions that call each other, often resulting in side effects such as inter-process message passing, I/O and database operations. Erlang variables are single assignment, i.e., once they have been given values, they cannot be updated. The language makes extensive use of pattern matching, as shown in the factorial example below:</p>
    <pre>
-module(factorial).
-export([fac/1]).
fac(0) -&gt; 1;
fac(N) when N&gt;0 -&gt;
   Prev = fac(N-1),
   N*Prev.
</pre>

    <p class="continue">Here, the first clause gives the factorial of zero, the second factorials of positive numbers. The body of each clause is a sequence of expressions, and the final expression in the body is the result of that clause. Calling the function with a negative number will result in a run time error, as none of the clauses match. Not handling this case is an example of non-defensive programming, a practice encouraged in Erlang.</p>

    <p>Within the module, functions are called in the usual way; outside, the name of the module is prepended, as in <code>factorial:fac(3)</code>. It is possible to define functions with the same name but different numbers of arguments—this is called their <em>arity</em>. In the export directive in the <code>factorial</code> module the <code>fac</code> function of arity one is denoted by <code>fac/1</code>.</p>

    <p>Erlang supports tuples (also called product types) and lists. Tuples are enclosed in curly brackets, as in <code>{ok,37}</code>. In tuples, we access elements by position. Records are another data type; they allow us to store a fixed number of elements which are then accessed and manipulated by name. We define a record using the <code>-record(state, {id, msg_list=[]}).</code> To create an instance, we use the expression <code>Var = #state{id=1}</code>, and we examine its contents using <code>Var#state.id</code>. For a variable number of elements, we use lists defined in square brackets such as in <code>{[}23,34{]}</code>. The notation <code>{[}X|Xs{]}</code> matches a non-empty list with head <code>X</code> and tail <code>Xs</code>. Identifiers beginning with a lower case letter denote atoms, which simply stand for themselves; the <code>ok</code> in the tuple <code>{ok,37}</code> is an example of an atom. Atoms used in this way are often used to distinguish between different kinds of function result: as well as <code>ok</code> results, there might be results of the form <code>{error, "Error String"}</code>.</p>

    <p>Processes in Erlang systems run concurrently in separate memory, and communicate with each other by message passing. Processes can be used for a wealth of applications, including gateways to databases, as handlers for protocol stacks, and to manage the logging of trace messages from other processes. Although these processes handle different requests, there will be similarities in how these requests are handled.</p>

    <p>As processes exist only within the virtual machine, a single VM can simultaneously run millions of processes, a feature Riak exploits extensively. For example, each request to the database—reads, writes, and deletes—is modeled as a separate process, an approach that would not be possible with most OS-level threading implementations.</p>

    <p>Processes are identified by process identifiers, called PIDs, but they can also be registered under an alias; this should only be used for long-lived "static" processes. Registering a process with its alias allows other processes to send it messages without knowing its PID. Processes are created using the <code>spawn(Module, Function, Arguments)</code> built-in function (BIF). BIFs are functions integrated in the VM and used to do what is impossible or slow to execute in pure Erlang. The <code>spawn/3</code> BIF takes a <code>Module</code>, a <code>Function</code> and a list of <code>Arguments</code> as parameters. The call returns the PID of the newly spawned process and as a side effect, creates a new process that starts executing the function in the module with the arguments mentioned earlier.</p>

    <p>A message <code>Msg</code> is sent to a process with process id <code>Pid</code> using <code>Pid ! Msg</code>. A process can find out its PID by calling the BIF <code>self</code>, and this can then be sent to other processes for them to use to communicate with the original process. Suppose that a process expects to receive messages of the form <code>{ok, N}</code> and <code>{error, Reason}</code>. To process these it uses a receive statement:</p>
    <pre>
receive
   {ok, N} -&gt;
      N+1;
   {error, _} -&gt;
      0
end
</pre>

    <p class="continue">The result of this is a number determined by the pattern-matched clause. When the value of a variable is not needed in the pattern match, the underscore wild-card can be used as shown above.</p>

    <p>Message passing between processes is asynchronous, and the messages received by a process are placed in the process's mailbox in the order in which they arrive. Suppose that now the <code>receive</code> expression above is to be executed: if the first element in the mailbox is either <code>{ok, N}</code> or <code>{error, Reason}</code> the corresponding result will be returned. If the first message in the mailbox is not of this form, it is retained in the mailbox and the second is processed in a similar way. If no message matches, the receive will wait for a matching message to be received.</p>

    <p>Processes terminate for two reasons. If there is no more code to execute, they are said to terminate with reason <em>normal</em>. If a process encounters a run-time error, it is said to terminate with a <em>non-normal</em> reason. A process terminating will not affect other processes unless they are linked to it. Processes can link to each other through the <code>link(Pid)</code> BIF or when calling the <code>spawn_link(Module, Function, Arguments)</code>. If a process terminates, it sends an EXIT signal to processes in its link set. If the termination reason is non-normal, the process terminates itself, propagating the EXIT signal further. By calling the <code>process_flag(trap_exit, true)</code> BIF, processes can receive the EXIT signals as Erlang messages in their mailbox instead of terminating.</p>

    <p>Riak uses EXIT signals to monitor the well-being of helper processes performing non-critical work initiated by the request-driving finite state machines. When these helper processes terminate abnormally, the EXIT signal allows the parent to either ignore the error or restart the process.</p>
  </div>

  <div class="sect">
    <h2 id="heading_id_5">15.2. Process Skeletons</h2>

    <p>We previously introduced the notion that processes follow a common pattern regardless of the particular purpose for which the process was created. To start off, a process has to be spawned and then, optionally, have its alias registered. The first action of the newly spawned process is to initialize the process loop data. The loop data is often the result of arguments passed to the <code>spawn</code> built-in function at the initialization of the process. Its loop data is stored in a variable we refer to as the process state. The state, often stored in a record, is passed to a receive-evaluate function, running a loop which receives a message, handles it, updates the state, and passes it back as an argument to a tail-recursive call. If one of the messages it handles is a `stop' message, the receiving process will clean up after itself and then terminate.</p>

    <p>This is a recurring theme among processes that will occur regardless of the task the process has been assigned to perform. With this in mind, let's look at the differences between the processes that conform to this pattern:</p>

    <ul>
      <li>The arguments passed to the <code>spawn</code> BIF calls will differ from one process to another.</li>

      <li>You have to decide whether you should register a process under an alias, and if you do, what alias should be used.</li>

      <li>In the function that initializes the process state, the actions taken will differ based on the tasks the process will perform.</li>

      <li>The state of the system is represented by the loop data in every case, but the contents of the loop data will vary among processes.</li>

      <li>When in the body of the receive-evaluate loop, processes will receive different messages and handle them in different ways.</li>

      <li>Finally, on termination, the cleanup will vary from process to process.</li>
    </ul>

    <p>So, even if a skeleton of generic actions exists, these actions are complemented by specific ones that are directly related to the tasks assigned to the process. Using this skeleton as a template, programmers can create Erlang processes that act as servers, finite state machines, event handlers and supervisors. But instead of re-implementing these patterns every time, they have been placed in library modules referred to as behaviors. They come as part as the OTP middleware.</p>
  </div>

  <div class="sect">
    <h2 id="heading_id_6">15.3. OTP Behaviors</h2>

    <p>The core team of developers committing to Riak is spread across nearly a dozen geographical locations. Without very tight coordination and templates to work from, the result would consist of different client/server implementations not handling special borderline cases and concurrency-related errors. There would probably be no uniform way to handle client and server crashes or guaranteeing that a response from a request is indeed the response, and not just any message that conforms to the internal message protocol.</p>

    <p>OTP is a set of Erlang libraries and design principles providing ready-made tools with which to develop robust systems. Many of these patterns and libraries are provided in the form of "behaviors."</p>

    <p>OTP behaviors address these issues by providing library modules that implement the most common concurrent design patterns. Behind the scenes, without the programmer having to be aware of it, the library modules ensure that errors and special cases are handled in a consistent way. As a result, OTP behaviors provide a set of standardized building blocks used in designing and building industrial-grade systems.</p>

    <div class="subsect">
      <h3 id="heading_id_7">15.3.1. Introduction</h3>

      <p>OTP behaviors are provided as library modules in the <code>stdlib</code> application which comes as part of the Erlang/OTP distribution. The specific code, written by the programmer, is placed in a separate module and called through a set of predefined callback functions standardized for each behavior. This callback module will contain all of the specific code required to deliver the desired functionality.</p>

      <p>OTP behaviors include worker processes, which do the actual processing, and supervisors, whose task is to monitor workers and other supervisors. Worker behaviors, often denoted in diagrams as circles, include servers, event handlers, and finite state machines. Supervisors, denoted in illustrations as squares, monitor their children, both workers and other supervisors, creating what is called a supervision tree.</p>

      <div class="figure" id="fig.erlang.supervision">
        <img alt="[OTP Riak Supervision Tree]" src="../Images/supervision-tree.png" />

        <p>Figure&nbsp;15.1: OTP Riak Supervision Tree</p>
      </div>

      <p>Supervision trees are packaged into a behavior called an application. OTP applications are not only the building blocks of Erlang systems, but are also a way to package reusable components. Industrial-grade systems like Riak consist of a set of loosely coupled, possibly distributed applications. Some of these applications are part of the standard Erlang distribution and some are the pieces that make up the specific functionality of Riak.</p>

      <p>Examples of OTP applications include the Corba ORB or the Simple Network Management Protocol (SNMP) agent. An OTP application is a reusable component that packages library modules together with supervisor and worker processes. From now on, when we refer to an application, we will mean an OTP application.</p>

      <p>The behavior modules contain all of the generic code for each given behavior type. Although it is possible to implement your own behavior module, doing so is rare because the ones that come with the Erlang/OTP distribution will cater to most of the design patterns you would use in your code. The generic functionality provided in a behavior module includes operations such as:</p>

      <ul>
        <li>spawning and possibly registering the process;</li>

        <li>sending and receiving client messages as synchronous or asynchronous calls, including defining the internal message protocol;</li>

        <li>storing the loop data and managing the process loop; and</li>

        <li>stopping the process.</li>
      </ul>

      <p>The loop data is a variable that will contain the data the behavior needs to store in between calls. After the call, an updated variant of the loop data is returned. This updated loop data, often referred to as the new loop data, is passed as an argument in the next call. Loop data is also often referred to as the behavior state.</p>

      <p>The functionality to be included in the callback module for the generic server application to deliver the specific required behavior includes the following:</p>

      <ul>
        <li>Initializing the process loop data, and, if the process is registered, the process name.</li>

        <li>Handling the specific client requests, and, if synchronous, the replies sent back to the client.</li>

        <li>Handling and updating the process loop data in between the process requests.</li>

        <li>Cleaning up the process loop data upon termination.</li>
      </ul>
    </div>

    <div class="subsect">
      <h3 id="heading_id_8">15.3.2. Generic Servers</h3>

      <p>Generic servers that implement client/server behaviors are defined in the <code>gen_server</code> behavior that comes as part of the standard library application. In explaining generic servers, we will use the <code>riak_core_node_watcher.erl</code> module from the <code>riak_core</code> application. It is a server that tracks and reports on which sub-services and nodes in a Riak cluster are available. The module headers and directives are as follows:</p>
      <pre>
-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
         services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
                bcast_mod={gen_server, abcast}}).
</pre>

      <p>We can easily recognize generic servers through the <code>-behavior(gen_server).</code> directive. This directive is used by the compiler to ensure all callback functions are properly exported. The record state is used in the server loop data.</p>
    </div>

    <div class="subsect">
      <h3 id="heading_id_9">15.3.3. Starting Your Server</h3>

      <p>With the <code>gen_server</code> behavior, instead of using the <code>spawn</code> and <code>spawn_link</code> BIFs, you will use the <code>gen_server:start</code> and <code>gen_server:start_link</code> functions. The main difference between <code>spawn</code> and <code>start</code> is the synchronous nature of the call. Using <code>start</code> instead of <code>spawn</code> makes starting the worker process more deterministic and prevents unforeseen race conditions, as the call will not return the PID of the worker until it has been initialized. You call the functions with either of:</p>
      <pre>
gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)
</pre>

      <p class="continue"><code>ServerName</code> is a tuple of the format <code>{local, Name}</code> or <code>{global, Name}</code>, denoting a local or global <code>Name</code> for the process alias if it is to be registered. Global names allow servers to be transparently accessed across a cluster of distributed Erlang nodes. If you do not want to register the process and instead reference it using its PID, you omit the argument and use a <code>start_link/3</code> or <code>start/3</code> function call instead. <code>CallbackModule</code> is the name of the module in which the specific callback functions are placed, <code>Arguments</code> is a valid Erlang term that is passed to the <code>init/1</code> callback function, while <code>Options</code> is a list that allows you to set the memory management flags <code>fullsweep_after</code> and <code>heapsize</code>, as well as other tracing and debugging flags.</p>

      <p>In our example, we call <code>start_link/4</code>, registering the process with the same name as the callback module, using the <code>?MODULE</code> macro call. This macro is expanded to the name of the module it is defined in by the preprocessor when compiling the code. It is always good practice to name your behavior with an alias that is the same as the callback module it is implemented in. We don't pass any arguments, and as a result, just send the empty list. The options list is kept empty:</p>
      <pre>
start_link() -&gt;
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
</pre>

      <p class="continue">The obvious difference between the <code>start_link</code> and <code>start</code> functions is that <code>start_link</code> links to its parent, most often a supervisor, while <code>start</code> doesn't. This needs a special mention as it is an OTP behavior's responsibility to link itself to the supervisor. The <code>start</code> functions are often used when testing behaviors from the shell, as a typing error causing the shell process to crash would not affect the behavior. All variants of the <code>start</code> and <code>start_link</code> functions return <code>{ok, Pid}</code>.</p>

      <p>The <code>start</code> and <code>start_link</code> functions will spawn a new process that calls the <code>init(Arguments)</code> callback function in the <code>CallbackModule</code>, with the <code>Arguments</code> supplied. The <code>init</code> function must initialize the <code>LoopData</code> of the server and has to return a tuple of the format <code>{ok, LoopData}</code>. <code>LoopData</code> contains the first instance of the loop data that will be passed between the callback functions. If you want to store some of the arguments you passed to the <code>init</code> function, you would do so in the <code>LoopData</code> variable. The <code>LoopData</code> in the Riak node watcher server is the result of the <code>schedule_broadcast/1</code> called with a record of type <code>state</code> where the fields are set to the default values:</p>
      <pre>
init([]) -&gt;

    %% Watch for node up/down events
    net_kernel:monitor_nodes(true),

    %% Setup ETS table to track node status
    ets:new(?MODULE, [protected, named_table]),

    {ok, schedule_broadcast(#state{})}.
</pre>

      <p>Although the supervisor process might call the <code>start_link/4</code> function, a different process calls the <code>init/1</code> callback: the one that was just spawned. As the purpose of this server is to notice, record, and broadcast the availability of sub-services within Riak, the initialization asks the Erlang runtime to notify it of such events, and sets up a table to store this information in. This needs to be done during initialization, as any calls to the server would fail if that structure did not yet exist. Do only what is necessary and minimize the operations in your <code>init</code> function, as the call to <code>init</code> is a synchronous call that prevents all of the other serialized processes from starting until it returns.</p>
    </div>

    <div class="subsect">
      <h3 id="heading_id_10">15.3.4. Passing Messages</h3>

      <p>If you want to send a synchronous message to your server, you use the <code>gen_server:call/2</code> function. Asynchronous calls are made using the <code>gen_server:cast/2</code> function. Let's start by taking two functions from Riak's service API; we will provide the rest of the code later. They are called by the client process and result in a synchronous message being sent to the server process registered with the same name as the callback module. Note that validating the data sent to the server should occur on the client side. If the client sends incorrect information, the server should terminate.</p>
      <pre>
service_up(Id, Pid) -&gt;
    gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) -&gt;
    gen_server:call(?MODULE, {service_down, Id}).
</pre>

      <p class="continue">Upon receiving the messages, the <code>gen_server</code> process calls the <code>handle_call/3</code> callback function dealing with the messages in the same order in which they were sent:</p>
      <pre>
handle_call({service_up, Id, Pid}, _From, State) -&gt;
    %% Update the set of active services locally
    Services = ordsets:add_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Setup a monitor for the Pid representing this service
    Mref = erlang:monitor(process, Pid),
    erlang:put(Mref, Id),
    erlang:put(Id, Mref),

    %% Update our local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) -&gt;
    %% Update the set of active services locally
    Services = ordsets:del_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Update local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};
</pre>

      <p class="continue">Note the return value of the callback function. The tuple contains the control atom <code>reply</code>, telling the <code>gen_server</code> generic code that the second element of the tuple (which in both of these cases is the atom <code>ok</code>) is the reply sent back to the client. The third element of the tuple is the new <code>State</code>, which, in a new iteration of the server, is passed as the third argument to the <code>handle_call/3</code> function; in both cases here it is updated to reflect the new set of available services. The argument <code>_From</code> is a tuple containing a unique message reference and the client process identifier. The tuple as a whole is used in library functions that we will not be discussing in this chapter. In the majority of cases, you will not need it.</p>

      <p>The <code>gen_server</code> library module has a number of mechanisms and safeguards built in that operate behind the scenes. If your client sends a synchronous message to your server and you do not get a response within five seconds, the process executing the <code>call/2</code> function is terminated. You can override this by using <code>gen_server:call(Name, Message, Timeout)</code> where <code>Timeout</code> is a value in milliseconds or the atom <code>infinity</code>.</p>

      <p>The timeout mechanism was originally put in place for deadlock prevention purposes, ensuring that servers that accidentally call each other are terminated after the default timeout. The crash report would be logged, and hopefully would result in the error being debugged and fixed. Most applications will function appropriately with a timeout of five seconds, but under very heavy loads, you might have to fine-tune the value and possibly even use <code>infinity</code>; this choice is application-dependent. All of the critical code in Erlang/OTP uses <code>infinity</code>. Various places in Riak use different values for the timeout: <code>infinity</code> is common between coupled pieces of the internals, while <code>Timeout</code> is set based on a user-passed parameter in cases where the client code talking to Riak has specified that an operation should be allowed to time out.</p>

      <p>Other safeguards when using the <code>gen_server:call/2</code> function include the case of sending a message to a nonexistent server and the case of a server crashing before sending its reply. In both cases, the calling process will terminate. In raw Erlang, sending a message that is never pattern-matched in a receive clause is a bug that can cause a memory leak. Two different strategies are used in Riak to mitigate this, both of which involve "catchall" matching clauses. In places where the message might be user-initiated, an unmatched message might be silently discarded. In places where such a message could only come from Riak's internals, it represents a bug and so will be used to trigger an error-alerting internal crash report, restarting the worker process that received it.</p>

      <p>Sending asynchronous messages works in a similar way. Messages are sent asynchronously to the generic server and handled in the <code>handle_cast/2</code> callback function. The function has to return a tuple of the format <code>{reply, NewState}</code>. Asynchronous calls are used when we are not interested in the request of the server and are not worried about producing more messages than the server can consume. In cases where we are not interested in a response but want to wait until the message has been handled before sending the next request, we would use a <code>gen_server:call/2</code>, returning the atom <code>ok</code> in the reply. Picture a process generating database entries at a faster rate than Riak can consume. By using asynchronous calls, we risk filling up the process mailbox and make the node run out of memory. Riak uses the message-serializing properties of synchronous <code>gen_server</code> calls to regulate load, processing the next request only when the previous one has been handled. This approach eliminates the need for more complex throttling code: in addition to enabling concurrency, <code>gen_server</code> processes can also be used to introduce serialization points.</p>
    </div>

    <div class="subsect">
      <h3 id="heading_id_11">15.3.5. Stopping the Server</h3>

      <p>How do you stop the server? In your <code>handle_call/3</code> and <code>handle_cast/2</code> callback functions, instead of returning <code>{reply, Reply, NewState}</code> or <code>{noreply, NewState}</code>, you can return <code>{stop, Reason, Reply, NewState}</code> or <code>{stop, Reason, NewState}</code>, respectively. Something has to trigger this return value, often a stop message sent to the server. Upon receiving the stop tuple containing the <code>Reason</code> and <code>State</code>, the generic code executes the <code>terminate(Reason, State)</code> callback.</p>

      <p>The <code>terminate</code> function is the natural place to insert the code needed to clean up the <code>State</code> of the server and any other persistent data used by the system. In our example, we send out one last message to our peers so that they know that this node watcher is no longer up and watching. In this example, the variable <code>State</code> contains a record with the fields <code>status</code> and <code>peers</code>:</p>
      <pre>
terminate(_Reason, State) -&gt;
    %% Let our peers know that we are shutting down
    broadcast(State#state.peers, State#state { status = down }).
</pre>

      <p>Use of the behavior callbacks as library functions and invoking them from other parts of your program is an extremely bad practice. For example, you should never call <code>riak_core_node_watcher:init(Args)</code> from another module to retrieve the initial loop data. Such retrievals should be done through a synchronous call to the server. Calls to behavior callback functions should originate only from the behavior library modules as a result of an event occurring in the system, and never directly by the user.</p>
    </div>
  </div>

  <div class="sect">
    <h2 id="heading_id_12">15.4. Other Worker Behaviors</h2>

    <p>A large number of other worker behaviors can and have been implemented using these same ideas.</p>

    <div class="subsect">
      <h3 id="heading_id_13">15.4.1. Finite State Machines</h3>

      <p>Finite state machines (FSMs), implemented in the <code>gen_fsm</code> behavior module, are a crucial component when implementing protocol stacks in telecom systems (the problem domain Erlang was originally invented for). States are defined as callback functions named after the state that return a tuple containing the next <code>State</code> and the updated loop data. You can send events to these states synchronously and asynchronously. The finite state machine callback module should also export the standard callback functions such as <code>init</code>, <code>terminate</code>, and <code>handle_info</code>.</p>

      <p>Of course, finite state machines are not telecom specific. In Riak, they are used in the request handlers. When a client issues a request such as <code>get</code>, <code>put</code>, or <code>delete</code>, the process listening to that request will spawn a process implementing the corresponding <code>gen_fsm</code> behavior. For instance, the <code>riak_kv_get_fsm</code> is responsible for handling a <code>get</code> request, retrieving data and sending it out to the client process. The FSM process will pass through various states as it determines which nodes to ask for the data, as it sends out messages to those nodes, and as it receives data, errors, or timeouts in response.</p>
    </div>

    <div class="subsect">
      <h3 id="heading_id_14">15.4.2. Event Handlers</h3>

      <p>Event handlers and managers are another behavior implemented in the <code>gen_event</code> library module. The idea is to create a centralized point that receives events of a specific kind. Events can be sent synchronously and asynchronously with a predefined set of actions being applied when they are received. Possible responses to events include logging them to file, sending off an alarm in the form of an SMS, or collecting statistics. Each of these actions is defined in a separate callback module with its own loop data, preserved between calls. Handlers can be added, removed, or updated for every specific event manager. So, in practice, for every event manager there could be many callback modules, and different instances of these callback modules could exist in different managers. Event handlers include processes receiving alarms, live trace data, equipment related events or simple logs.</p>

      <p>One of the uses for the <code>gen_event</code> behavior in Riak is for managing subscriptions to "ring events", i.e., changes to the membership or partition assignment of a Riak cluster. Processes on a Riak node can register a function in an instance of <code>riak_core_ring_events</code>, which implements the <code>gen_event</code> behavior. Whenever the central process managing the ring for that node changes the membership record for the overall cluster, it fires off an event that causes each of those callback modules to call the registered function. In this fashion, it is easy for various parts of Riak to respond to changes in one of Riak's most central data structures without having to add complexity to the central management of that structure.</p>

      <p>Most common concurrency and communication patterns are handled with the three primary behaviors we've just discussed: <code>gen_server</code>, <code>gen_fsm</code>, and <code>gen_event</code>. However, in large systems, some application-specific patterns emerge over time that warrant the creation of new behaviors. Riak includes one such behavior, <code>riak_core_vnode</code>, which formalizes how virtual nodes are implemented. Virtual nodes are the primary storage abstraction in Riak, exposing a uniform interface for key-value storage to the request-driving FSMs. The interface for callback modules is specified using the <code>behavior_info/1</code> function, as follows:</p>
      <pre>
behavior_info(callbacks) -&gt;
    [{init,1},
     {handle_command,3},
     {handoff_starting,2},
     {handoff_cancelled,1},
     {handoff_finished,2},
     {handle_handoff_command,3},
     {handle_handoff_data,2},
     {encode_handoff_item,2},
     {is_empty,1},
     {terminate,2},
     {delete,1}];
</pre>

      <p class="continue">The above example shows the <code>behavior_info/1</code> function from <code>riak_core_vnode</code>. The list of <code>{CallbackFunction, Arity}</code> tuples defines the contract that callback modules must follow. Concrete virtual node implementations must export these functions, or the compiler will emit a warning. Implementing your own OTP behaviors is relatively straightforward. Alongside defining your callback functions, using the <code>proc_lib</code> and <code>sys</code> modules, you need to start them with particular functions, handle system messages and monitor the parent in case it terminates.</p>
    </div>
  </div>

  <div class="sect">
    <h2 id="heading_id_15">15.5. Supervisors</h2>

    <p>The supervisor behavior's task is to monitor its children and, based on some preconfigured rules, take action when they terminate. Children consist of both supervisors and worker processes. This allows the Riak codebase to focus on the correct case, which enables the supervisor to handle software bugs, corrupt data or system errors in a consistent way across the whole system. In the Erlang world, this non-defensive programming approach is often referred to the "let it crash" strategy. The children that make up the supervision tree can include both supervisors and worker processes. Worker processes are OTP behaviors including the <code>gen_fsm</code>, <code>gen_server</code>, and <code>gen_event</code>. The Riak team, not having to handle borderline error cases, get to work with a smaller code base. This code base, because of its use of behaviors, is smaller to start off with, as it only deals with specific code. Riak has a top-level supervisor like most Erlang applications, and also has sub-supervisors for groups of processes with related responsibilities. Examples include Riak's virtual nodes, TCP socket listeners, and query-response managers.</p>

    <div class="subsect">
      <h3 id="heading_id_16">15.5.1. Supervisor Callback Functions</h3>

      <p>To demonstrate how the supervisor behavior is implemented, we will use the <code>riak_core_sup.erl</code> module. The Riak core supervisor is the top level supervisor of the Riak core application. It starts a set of static workers and supervisors, together with a dynamic number of workers handling the HTTP and HTTPS bindings of the node's RESTful API defined in application specific configuration files. In a similar way to <code>gen_servers</code>, all supervisor callback modules must include the <code>-behavior(supervisor).</code> directive. They are started using the <code>start</code> or <code>start_link</code> functions which take the optional <code>ServerName</code>, the <code>CallBackModule</code>, and an <code>Argument</code> which is passed to the <code>init/1</code> callback function.</p>

      <p>Looking at the first few lines of code in the <code>riak_core_sup.erl</code> module, alongside the behavior directive and a macro we will describe later, we notice the <code>start_link/3</code> function:</p>
      <pre>
-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() -&gt;
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
</pre>

      <p class="continue">Starting a supervisor will result in a new process being spawned, and the <code>init/1</code> callback function being called in the callback module <code>riak_core_sup.erl</code>. The <code>ServerName</code> is a tuple of the format <code>{local, Name}</code> or <code>{global, Name}</code>, where <code>Name</code> is the supervisor's registered name. In our example, both the registered name and the callback module are the atom <code>riak_core_sup</code>, originating form the <code>?MODULE</code> macro. We pass the empty list as an argument to <code>init/1</code>, treating it as a null value. The <code>init</code> function is the only supervisor callback function. It has to return a tuple with format:</p>
      <pre>
{ok,  {SupervisorSpecification, ChildSpecificationList}}
</pre>

      <p class="continue">where <code>SupervisorSpecification</code> is a 3-tuple <code>{RestartStrategy, AllowedRestarts, MaxSeconds}</code> containing information on how to handle process crashes and restarts. <code>RestartStrategy</code> is one of three configuration parameters determining how the behavior's siblings are affected upon abnormal termination:</p>

      <ul>
        <li><code>one_for_one</code>: other processes in the supervision tree are not affected.</li>

        <li><code>rest_for_one</code>: processes started after the terminating process are terminated and restarted.</li>

        <li><code>one_for_all</code>: all processes are terminated and restarted.</li>
      </ul>

      <p><code>AllowedRestarts</code> states how many times any of the supervisor children may terminate in <code>MaxSeconds</code> before the supervisor terminates itself (and its children). When ones terminates, it sends an EXIT signal to its supervisor which, based on its restart strategy, handles the termination accordingly. The supervisor terminating after reaching the maximum allowed restarts ensures that cyclic restarts and other issues that cannot be resolved at this level are escalated. Chances are that the issue is in a process located in a different sub-tree, allowing the supervisor receiving the escalation to terminate the affected sub-tree and restart it.</p>

      <p>Examining the last line of the <code>init/1</code> callback function in the <code>riak_core_sup.erl</code> module, we notice that this particular supervisor has a one-for-one strategy, meaning that the processes are independent of each other. The supervisor will allow a maximum of ten restarts before restarting itself.</p>

      <p><code>ChildSpecificationList</code> specifies which children the supervisor has to start and monitor, together with information on how to terminate and restart them. It consists of a list of tuples of the following format:</p>
      <pre>
{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}
</pre>

      <p><code>Id</code> is a unique identifier for that particular supervisor. <code>Module</code>, <code>Function</code>, and <code>Arguments</code> is an exported function which results in the behavior <code>start_link</code> function being called, returning the tuple of the format <code>{ok, Pid}</code>. The <code>Restart</code> strategy dictates what happens depending on the termination type of the process, which can be:</p>

      <ul>
        <li><code>transient</code> processes, which are never restarted;</li>

        <li><code>temporary</code> processes, are restarted only if they terminate abnormally; and</li>

        <li><code>permanent</code> processes, which are always restarted, regardless of the termination being normal or abnormal.</li>
      </ul>

      <p><code>Shutdown</code> is a value in milliseconds referring to the time the behavior is allowed to execute in the <code>terminate</code> function when terminating as the result of a restart or shutdown. The atom <code>infinity</code> can also be used, but for behaviors other than supervisors, it is highly discouraged. <code>Type</code> is either the atom <code>worker</code>, referring to the generic servers, event handlers and finite state machines, or the atom <code>supervisor</code>. Together with <code>ModuleList</code>, a list of modules implementing the behavior, they are used to control and suspend processes during the runtime software upgrade procedures. Only existing or user implemented behaviors may be part of the child specification list and hence included in a supervision tree.</p>

      <p>With this knowledge at hand, we should now be able to formulate a restart strategy defining inter-process dependencies, fault tolerance thresholds and escalation procedures based on a common architecture. We should also be able to understand what is going on in the <code>init/1</code> example of the <code>riak_core_sup.erl</code> module. First of all, study the <code>CHILD</code> macro. It creates the child specification for one child, using the callback module name as <code>Id</code>, making it permanent and giving it a shut down time of 5 seconds. Different child types can be workers or supervisors. Have a look at the example, and see what you can make out of it:</p>
      <pre>
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

init([]) -&gt;
    RiakWebs = case lists:flatten(riak_core_web:bindings(http),
                                  riak_core_web:bindings(https)) of
                   [] -&gt;
                       %% check for old settings, in case app.config
                       %% was not updated
                       riak_core_web:old_binding();
                   Binding -&gt;
                       Binding
               end,

    Children =
                 [?CHILD(riak_core_vnode_sup, supervisor),
                  ?CHILD(riak_core_handoff_manager, worker),
                  ?CHILD(riak_core_handoff_listener, worker),
                  ?CHILD(riak_core_ring_events, worker),
                  ?CHILD(riak_core_ring_manager, worker),
                  ?CHILD(riak_core_node_watcher_events, worker),
                  ?CHILD(riak_core_node_watcher, worker),
                  ?CHILD(riak_core_gossip, worker) |
                  RiakWebs
                 ],
    {ok, {{one_for_one, 10, 10}, Children}}.
</pre>

      <p>Most of the <code>Children</code> started by this supervisor are statically defined workers (or in the case of the <code>vnode_sup</code>, a supervisor). The exception is the <code>RiakWebs</code> portion, which is dynamically defined depending on the HTTP portion of Riak's configuration file.</p>

      <p>With the exception of library applications, every OTP application, including those in Riak, will have their own supervision tree. In Riak, various top-level applications are running in the Erlang node, such as <code>riak_core</code> for distributed systems algorithms, <code>riak_kv</code> for key/value storage semantics, <code>webmachine</code> for HTTP, and more. We have shown the expanded tree under <code>riak_core</code> to demonstrate the multi-level supervision going on. One of the many benefits of this structure is that a given subsystem can be crashed (due to bug, environmental problem, or intentional action) and only that subtree will in a first instance be terminated.</p>

      <p>The supervisor will restart the needed processes and the overall system will not be affected. In practice we have seen this work well for Riak. A user might figure out how to crash a virtual node, but it will just be restarted by <code>riak_core_vnode_sup</code>. If they manage to crash that, the <code>riak_core</code> supervisor will restart it, propagating the termination to the top-level supervisor. This failure isolation and recovery mechanism allows Riak (and Erlang) developers to straightforwardly build resilient systems.</p>

      <p>The value of the supervisory model was shown when one large industrial user created a very abusive environment in order to find out where each of several database systems would fall apart. This environment created random huge bursts of both traffic and failure conditions. They were confused when Riak simply wouldn't stop running, even under the worst such arrangement. Under the covers, of course, they were able to make individual processes or subsystems crash in multiple ways—but the supervisors would clean up and restart things to put the whole system back into working order every time.</p>
    </div>

    <div class="subsect">
      <h3 id="heading_id_17">15.5.2. Applications</h3>

      <p>The <code>application</code> behavior we previously introduced is used to package Erlang modules and resources into reusable components. In OTP, there are two kinds of applications. The most common form, called normal applications, will start a supervision tree and all of the relevant static workers. Library applications such as the Standard Library, which come as part of the Erlang distribution, contain library modules but do not start a supervision tree. This is not to say that the code may not contain processes or supervision trees. It just means they are started as part of a supervision tree belonging to another application.</p>

      <p>An Erlang system will consist of a set of loosely coupled applications. Some are written by the developers, some are available as open source, and others are be part of the Erlang/OTP distribution. The Erlang runtime system and its tools treat all applications equally, regardless of whether they are part of the Erlang distribution or not.</p>
    </div>
  </div>

  <div class="sect">
    <h2 id="heading_id_18">15.6. Replication and Communication in Riak</h2>

    <p>Riak was designed for extreme reliability and availability at a massive scale, and was inspired by Amazon's Dynamo storage system [<a href="../Text/bibliography.html#bib_amazon_dynamo">DHJ+07</a>]. Dynamo and Riak's architectures combine aspects of both Distributed Hash Tables (DHTs) and traditional databases. Two key techniques that both Riak and Dynamo use are <em>consistent hashing</em> for replica placement and a <em>gossip protocol</em> for sharing common state.</p>

    <p>Consistent hashing requires that all nodes in the system know about each other, and know what partitions each node owns. This assignment data could be maintained in a centrally managed configuration file, but in large configurations, this becomes extremely difficult. Another alternative is to use a central configuration server, but this introduces a single point of failure in the system. Instead, Riak uses a gossip protocol to propagate cluster membership and partition ownership data throughout the system.</p>

    <p>Gossip protocols, also called epidemic protocols, work exactly as they sound. When a node in the system wishes to change a piece of shared data, it makes the change to its local copy of the data and gossips the updated data to a random peer. Upon receiving an update, a node merges the received changes with its local state and gossips again to another random peer.</p>

    <p>When a Riak cluster is started, all nodes must be configured with the same partition count. The consistent hashing ring is then divided by the partition count and each interval is stored locally as a <code>{HashRange, Owner}</code> pair. The first node in a cluster simply claims all the partitions. When a new node joins the cluster, it contacts an existing node for its list of <code>{HashRange, Owner}</code> pairs. It then claims (partition count)/(number of nodes) pairs, updating its local state to reflect its new ownership. The updated ownership information is then gossiped to a peer. This updated state then spread throughout the entire cluster using the above algorithm.</p>

    <p>By using a gossip protocol, Riak avoids introducing a single point of failure in the form of a centralized configuration server, relieving system operators from having to maintain critical cluster configuration data. Any node can then use the gossiped partition assignment data in the system to route requests. When used together, the gossip protocol and consistent hashing enable Riak to function as a truly decentralized system, which has important consequences for deploying and operating large-scale systems.</p>
  </div>

  <div class="sect">
    <h2 id="heading_id_19">15.7. Conclusions and Lessons Learned</h2>

    <p>Most programmers believe that smaller and simpler codebases are not only easier to maintain, they often have fewer bugs. By using Erlang's basic distribution primitives for communication in a cluster, Riak can start out with a fundamentally sound asynchronous messaging layer and build its own protocols without having to worry about that underlying implementation. As Riak grew into a mature system, some aspects of its networked communication moved away from use of Erlang's built-in distribution (and toward direct manipulation of TCP sockets) while others remained a good fit for the included primitives. By starting out with Erlang's native message passing for everything, the Riak team was able to build out the whole system very quickly. These primitives are clean and clear enough that it was still easy later to replace the few places where they turned out to not be the best fit in production.</p>

    <p>Also, due to the nature of Erlang messaging and the lightweight core of the Erlang VM, a user can just as easily run 12 nodes on 1 machine or 12 nodes on 12 machines. This makes development and testing much easier when compared to more heavyweight messaging and clustering mechanisms. This has been especially valuable due to Riak's fundamentally distributed nature. Historically, most distributed systems are very difficult to operate in a "development mode" on a single developer's laptop. As a result, developers often end up testing their code in an environment that is a subset of their full system, with very different behavior. Since a many-node Riak cluster can be trivially run on a single laptop without excessive resource consumption or tricky configuration, the development process can more easily produce code that is ready for production deployment.</p>

    <p>The use of Erlang/OTP supervisors makes Riak much more resilient in the face of subcomponent crashes. Riak takes this further; inspired by such behaviors, a Riak cluster is also able to easily keep functioning even when whole nodes crash and disappear from the system. This can lead to a sometimes-surprising level of resilience. One example of this was when a large enterprise was stress-testing various databases and intentionally crashing them to observe their edge conditions. When they got to Riak, they became confused. Each time they would find a way (through OS-level manipulation, bad IPC, etc) to crash a subsystem of Riak, they would see a very brief dip in performance and then the system returned to normal behavior. This is a direct result of a thoughtful "let it crash" approach. Riak was cleanly restarting each of these subsystems on demand, and the overall system simply continued to function. That experience shows exactly the sort of resilience enabled by Erlang/OTP's approach to building programs.</p>

    <div class="subsect">
      <h3 id="heading_id_20">15.7.1. Acknowledgments</h3>

      <p>This chapter is based on Francesco Cesarini and Simon Thompson's 2009 lecture notes from the central European Functional Programming School held in Budapest and Komárno. Major contributions were made by Justin Sheehy of Basho Technologies and Simon Thompson of the University of Kent. A special thank you goes to all of the reviewers, who at different stages in the writing of this chapter provided valuable feedback.</p>
    </div>
  </div>

  <div class="footer"></div>
</body>
</html>
